JDK7的新特性1 ForkJoinPool
JDK7的一个新特性:ForkJoinPool 很值得一说。
ForkJoinPool,首先上代码:
//计算 π 的值有一个通过多项式方法,即:π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……),而且多项式的项数越多,计算出的 π 的值越精确
static class PiEstimateTask extends RecursiveTask<Double> {
private static final long serialVersionUID = 1L;
private final long begin;
private final long end;
private final long threshold; // 分割任务的临界值
public PiEstimateTask(long begin, long end, long threshold) {
this.begin = begin;
this.end = end;
this.threshold = threshold;
}
@Override
protected Double compute() { // 实现 compute 方法
if (end - begin <= threshold) { // 临界值之下,不再分割,直接计算
int sign; // 符号,多项式中偶数位取 1,奇数位取 -1(位置从 0 开始)
double result = 0.0;
for (long i = begin; i < end; i++) {
sign = (i & 1) == 0 ? 1 : -1;
result += sign / (i * 2.0 + 1);
}
System.out.println(Thread.currentThread()+ " From: "+ begin + " End: "+ end);
return result * 4;
}
// 分割任务
long middle = (begin + end) / 2;
PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold);
PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold);
leftTask.fork(); // 异步执行 leftTask
rightTask.fork(); // 异步执行 rightTask
double leftResult = leftTask.join(); // 阻塞,直到 leftTask 执行完毕返回结果
double rightResult = rightTask.join(); // 阻塞,直到 rightTask 执行完毕返回结果
return leftResult + rightResult; // 合并结果
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool pool = new ForkJoinPool();
pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_00);
pool.execute(task);
System.out.println(task.get());
}
从上面的代码中,我们可以看到ForkJoinPool,适用的一类业务: 把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。
ForkJoinPool支持的任务大类有两种,一类是:Runnable ,类似于ExecutorService。一类是ForkJoinTask。
其中的ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。
代码分析
根据上面的示例代码,可以看出 fork() 和 join() 是 Fork/Join Framework “魔法”的关键。我们可以根据函数名假设一下 fork() 和 join() 的作用:
fork():开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。 join():等待该任务的处理线程处理完毕,获得返回值。
并不是每个 fork() 都会促成一个新线程被创建,而每个 join() 也不是一定会造成线程被阻塞。
具体的数据结构如下图:
Fork/Join Framework 的实现算法并不是那么“显然”,而是一个更加复杂的算法——这个算法的名字就叫做 work stealing 算法。
ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
fork() :
做的工作只有一件事,既是把任务推入当前工作线程的工作队列里。
join() :
工作则复杂得多,也是 join() 可以使得线程免于被阻塞的原因——不像同名的 Thread.join()。
- 检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
- 查看任务的完成状态,如果已经完成,直接返回结果。
- 如果任务尚未完成,但处于自己的工作队列内,则完成它。
- 如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务。
- 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务。 递归地执行第5步。
所谓work-stealing模式,即每个工作线程都会有自己的任务队列。当工作线程完成了自己所有的工作后,就会去“偷”别的工作线程的任务。
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
submit():
其实除了前面介绍过的每个工作线程自己拥有的工作队列以外,ForkJoinPool 自身也拥有工作队列,这些工作队列的作用是用来接收由外部线程(非 ForkJoinThread 线程)提交过来的任务,而这些工作队列被称为 submitting queue 。
submit() 和 fork() 其实没有本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操作)。submitting queue 和其他 work queue 一样,是工作线程”窃取“的对象,因此当其中的任务被一个工作线程成功窃取时,就意味着提交的任务真正开始进入执行阶段。
更加详细的研究:http://gee.cs.oswego.edu/dl/papers/fj.pdf
ForkJoinPool与ThreadPoolExecutor区别:
1.ForkJoinPool中的每个线程都会有一个队列,而ThreadPoolExecutor只有一个队列,并根据queue类型不同,细分出各种线程池
2.ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,ThreadPoolExecutor中根本没有什么父子关系任务
3.ForkJoinPool在使用过程中,会创建大量的子任务,会进行大量的gc,但是ThreadPoolExecutor不需要,因此单线程(或者任务分配平均)
4.ForkJoinPool在多任务,且任务分配不均是有优势,但是在单线程或者任务分配均匀的情况下,效率没有ThreadPoolExecutor高,毕竟要进行大量gc子任务
ForkJoinPool在多线程情况下,能够实现工作窃取(Work Stealing),在该线程池的每个线程中会维护一个队列来存放需要被执行的任务。当线程自身队列中的任务都执行完毕后,它会从别的线程中拿到未被执行的任务并帮助它执行。
ThreadPoolExecutor因为它其中的线程并不会关注每个任务之间任务量的差异。当执行任务量最小的任务的线程执行完毕后,它就会处于空闲的状态(Idle),等待任务量最大的任务执行完毕。
因此多任务在多线程中分配不均时,ForkJoinPool效率高。
stream中应用ForkJoinPool
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
parallelStream让部分Java代码自动地以并行的方式执行
最后:
有一点要注意,就是手动设置ForkJoinPool的线程数量时,实际线程数为设置的线程数+1,因为还有一个main主线程
即使将ForkJoinPool的通用线程池的线程数量设置为1,实际上也会有2个工作线程。因此线程数为1的ForkJoinPool通用线程池和线程数为2的ThreadPoolExecutor是等价的。
与ForkJoinPool对应的是CompletableFuture
Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。 阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果
CompletableFuture就是利用观察者设计模式当计算结果完成及时通知监听者 在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
- 上一篇 LeetCode18, 19,20
- 下一篇 并发知识梳理:概述